{
 "cells": [
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "---\n",
    "title: \"Model Serving with KServe and Tensorflow - MNIST Classification\"\n",
    "date: 2021-02-24\n",
    "type: technical_note\n",
    "draft: false\n",
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Model Serving with KServe and Tensorflow - MNIST Classification\n",
    "---\n",
    "*INPUT [ TRANSFORMER --> ENRICHED INPUT ] --> MODEL --> PREDICTION*\n",
    "\n",
    "<font color='red'> <h3>This notebook requires KServe to be installed</h3></font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> **NOTE:** It is assumed that a model called *mnist* is already available in Hopsworks. An example of training a model for the *MNIST handwritten digit classification problem* is available in `Jupyter/experiment/Tensorflow/mnist.ipynb`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)\n",
    "\n",
    "![hops.png](../../../images/hops.png)\n",
    "\n",
    "### Hopsworks Machine Learning (HSML) library\n",
    "\n",
    "`hsml` is the library to interact with the Hopsworks Model Registry and Model Serving. The library makes it easy to export, manage and deploy models. To learn more about `hsml`, see the <a href=\"https://docs.hopsworks.ai/machine-learning-api/latest\">Hopsworks Model Management</a> docs."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serve the MNIST classifier\n",
    "\n",
    "Tensorflow models can be serve directly on KServe without a custom predictor script. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transformer script (Optional) \n",
    "\n",
    "To serve a model with a transformer, write a python script that implements the `Transformer` class and the methods `preprocess` and `postprocess`, like this:\n",
    "\n",
    "```python\n",
    "class Transformer(object):\n",
    "    def __init__(self):\n",
    "        \"\"\"Initialization code goes here\"\"\"\n",
    "        # NOTE: The env var ARTIFACT_FILES_PATH contains the path to the model artifact files\n",
    "\n",
    "    def preprocess(self, inputs):\n",
    "        \"\"\"Transform the request inputs. The object returned by this method will be used as model input.\"\"\"\n",
    "        return inputs\n",
    "\n",
    "    def postprocess(self, outputs):\n",
    "        \"\"\"Transform the predictions computed by the model before returning a response.\"\"\"\n",
    "        return outputs\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a connection to Hopsworks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Connected. Call `.close()` to terminate connection gracefully.\n"
     ]
    }
   ],
   "source": [
    "import hsml\n",
    "\n",
    "# Connect with Hopsworks\n",
    "conn = hsml.connection()\n",
    "\n",
    "# Retrieve the model registry handle\n",
    "mr = conn.get_model_registry()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query Model Registry for best mnist Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model name: mniste2e\n",
      "Model version: 1\n",
      "{'accuracy': '0.65625'}\n"
     ]
    }
   ],
   "source": [
    "MODEL_NAME=\"mniste2e\"\n",
    "\n",
    "# Get the best version of the model\n",
    "best_model = mr.get_best_model(MODEL_NAME, \"accuracy\", \"max\")\n",
    "\n",
    "print('Model name: ' + best_model.name)\n",
    "print('Model version: ' + str(best_model.version))\n",
    "print(best_model.training_metrics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create Deployment for the Trained Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create serving instance\n",
    "mnistclassifier = best_model.deploy(serving_tool=\"KSERVE\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After the deployment has been created, you can find it in the Hopsworks UI by going to the \"Deployments\" tab. You can also use the class attributes or the `.describe()` method of a deployment object to access its metadata."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Deployment: mniste2e\n",
      "{\n",
      "    \"artifact_version\": 0,\n",
      "    \"batching_enabled\": false,\n",
      "    \"created\": \"2022-05-18T14:24:58.093Z\",\n",
      "    \"creator\": \"Admin Admin\",\n",
      "    \"id\": 6,\n",
      "    \"inference_logging\": \"ALL\",\n",
      "    \"kafka_topic_dto\": {\n",
      "        \"name\": \"CREATE\",\n",
      "        \"num_of_partitions\": 1,\n",
      "        \"num_of_replicas\": 1\n",
      "    },\n",
      "    \"model_name\": \"mniste2e\",\n",
      "    \"model_path\": \"/Projects/demo_ml_meb10000/Models/mniste2e\",\n",
      "    \"model_server\": \"TENSORFLOW_SERVING\",\n",
      "    \"model_version\": 1,\n",
      "    \"name\": \"mniste2e\",\n",
      "    \"predictor\": null,\n",
      "    \"predictor_resource_config\": {\n",
      "        \"cores\": 1,\n",
      "        \"gpus\": 0,\n",
      "        \"memory\": 1024\n",
      "    },\n",
      "    \"requested_instances\": 1,\n",
      "    \"serving_tool\": \"KSERVE\"\n",
      "}\n"
     ]
    }
   ],
   "source": [
    "print(\"Deployment: \" + mnistclassifier.name)\n",
    "mnistclassifier.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Use a transformer to enrich model inputs (optional)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When creating a deployment for the trained model, you can attach a transformer by setting a custom transformer script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# from hsml.transformer import Transformer\n",
    "\n",
    "# TRANSFORMER_SCRIPT_PATH = mr.project_path + \"/Jupyter/serving/kserve/tensorflow/transformer.py\" # or .ipynb\n",
    "\n",
    "# mnistclassifier = best_model.deploy(serving_tool=\"KSERVE\",\n",
    "#                                     transformer=Transformer(script_file=TRANSFORMER_SCRIPT_PATH))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classify digits with the MNIST classifier"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Start Deployment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "8abfac2d8de84948bf91d046f12bd73d",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "mnistclassifier.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Send Prediction Requests to the Deployed Model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For making inference requests you can use the `.predict()` method of the deployment metadata object."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n",
      "{'predictions': [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]}\n"
     ]
    }
   ],
   "source": [
    "for i in range(10):\n",
    "    data = { \"signature_name\": \"serving_default\", \"instances\": [best_model.input_example] }\n",
    "    predictions = mnistclassifier.predict(data)\n",
    "    print(predictions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Monitor Prediction Requests and Responses using Kafka"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import kafka\n",
    "from confluent_kafka import Producer, Consumer, KafkaError"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Setup Kafka consumer and subscribe to the topic containing the prediction logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "TOPIC_NAME = mnistclassifier.inference_logger.kafka_topic.name\n",
    "\n",
    "config = kafka.get_kafka_default_config()\n",
    "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n",
    "consumer = Consumer(config)\n",
    "topics = [TOPIC_NAME]\n",
    "consumer.subscribe(topics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read the Kafka Avro schema from Hopsworks and setup an Avro reader"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "json_schema = kafka.get_schema(TOPIC_NAME)\n",
    "avro_schema = kafka.convert_json_schema_to_avro(json_schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read messages from the Kafka topic, parse them with the Avro schema and print the results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "INFO -> servingId: 6, modelName: mniste2e, modelVersion: 1,requestTimestamp: 1652884488, inferenceId:8e71249a-e6ee-4ed8-902a-448e1e2a6d44, messageType:response\n",
      "Predictions -> [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]\n",
      "\n",
      "INFO -> servingId: 6, modelName: mniste2e, modelVersion: 1,requestTimestamp: 1652884489, inferenceId:3ca12ec2-bb19-4999-9d1a-71165e95671b, messageType:response\n",
      "Predictions -> [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]\n",
      "\n",
      "INFO -> servingId: 6, modelName: mniste2e, modelVersion: 1,requestTimestamp: 1652884489, inferenceId:12cb9329-0868-4cc1-81a9-e3697c10d4e7, messageType:response\n",
      "Predictions -> [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]\n",
      "\n",
      "INFO -> servingId: 6, modelName: mniste2e, modelVersion: 1,requestTimestamp: 1652884490, inferenceId:beabb1b1-e3d5-4acd-858b-fdbffa2a3b25, messageType:response\n",
      "Predictions -> [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]\n",
      "\n",
      "INFO -> servingId: 6, modelName: mniste2e, modelVersion: 1,requestTimestamp: 1652884490, inferenceId:94e86c3b-8d86-434c-a4df-244d99999a9d, messageType:response\n",
      "Predictions -> [[0.0, 0.0, 0.0, 0.0, 7.00596943e-14, 0.0, 0.0, 0.0, 0.0, 1.0]]\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "\n",
    "PRINT_INSTANCES=False\n",
    "PRINT_PREDICTIONS=True\n",
    "\n",
    "for i in range(0, 10):\n",
    "    msg = consumer.poll(timeout=5.0)\n",
    "    if msg is not None:\n",
    "        value = msg.value()\n",
    "        try:\n",
    "            event_dict = kafka.parse_avro_msg(value, avro_schema)  \n",
    "            payload = json.loads(event_dict[\"payload\"])\n",
    "            \n",
    "            if (event_dict['messageType'] == \"request\" and not PRINT_INSTANCES) or \\\n",
    "                (event_dict['messageType'] == \"response\" and not PRINT_PREDICTIONS):\n",
    "                continue\n",
    "            \n",
    "            print(\"INFO -> servingId: {}, modelName: {}, modelVersion: {},\"\\\n",
    "                  \"requestTimestamp: {}, inferenceId:{}, messageType:{}\".format(\n",
    "                       event_dict[\"servingId\"],\n",
    "                       event_dict[\"modelName\"],\n",
    "                       event_dict[\"modelVersion\"],\n",
    "                       event_dict[\"requestTimestamp\"],\n",
    "                       event_dict[\"inferenceId\"],\n",
    "                       event_dict[\"messageType\"]))\n",
    "\n",
    "            if event_dict['messageType'] == \"request\":\n",
    "                print(\"Instances -> {}\\n\".format(payload['instances']))\n",
    "                \n",
    "            if event_dict['messageType'] == \"response\":\n",
    "                print(\"Predictions -> {}\\n\".format(payload['predictions']))\n",
    "\n",
    "        except Exception as e:\n",
    "            print(\"A message was read but there was an error parsing it\")\n",
    "            print(e)\n",
    "    else:\n",
    "        print(\"timeout.. no more messages to read from topic\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
],
"metadata": {
 "kernelspec": {
  "display_name": "Python",
  "language": "python",
  "name": "python3"
 },
 "language_info": {
  "codemirror_mode": {
   "name": "ipython",
   "version": 3
  },
  "file_extension": ".py",
  "mimetype": "text/x-python",
  "name": "python",
  "nbconvert_exporter": "python",
  "pygments_lexer": "ipython3",
  "version": "3.8.11"
 }
},
"nbformat": 4,
"nbformat_minor": 4
}